This commit alters Cargo's behavior when the `-vv` option is passed (two verbose
flags) to stream output of all build scripts to the console. Cargo makes not
attempt to prevent interleaving or indicate *which* build script is producing
output, rather it simply forwards all output to one to the console.
Cargo still acts as a middle-man, capturing the output, to parse build script
output and interpret the results. The parsing is still deferred to completion
but the stream output happens while the build script is running.
On Unix this is implemented via `select` and on Windows this is implemented via
IOCP.
Closes #1106
"libc 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"libgit2-sys 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
+ "miow 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 0.1.58 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
+[[package]]
+name = "cfg-if"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
[[package]]
name = "cmake"
version = "0.1.16"
"libc 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
+[[package]]
+name = "miow"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "kernel32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "net2 0.2.24 (registry+https://github.com/rust-lang/crates.io-index)",
+ "winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "net2"
+version = "0.2.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "kernel32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
[[package]]
name = "nom"
version = "1.2.2"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
+[[package]]
+name = "ws2_32-sys"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "winapi 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
kernel32-sys = "0.2"
libc = "0.2"
log = "0.3"
+miow = "0.1"
num_cpus = "0.2"
regex = "0.1"
rustc-serialize = "0.3"
use std::path::{PathBuf, Path};
use std::str;
use std::sync::{Mutex, Arc};
+use std::process::{Stdio, Output};
use core::PackageId;
use util::{CargoResult, Human};
use util::{internal, ChainError, profile, paths};
-use util::Freshness;
+use util::{Freshness, ProcessBuilder, read2};
+use util::errors::{process_error, ProcessError};
use super::job::Work;
+use super::job_queue::JobState;
use super::{fingerprint, Kind, Context, Unit};
use super::CommandType;
try!(fs::create_dir_all(&cx.layout(unit.pkg, Kind::Host).build(unit.pkg)));
try!(fs::create_dir_all(&cx.layout(unit.pkg, unit.kind).build(unit.pkg)));
- let exec_engine = cx.exec_engine.clone();
-
// Prepare the unit of "dirty work" which will actually run the custom build
// command.
//
// Note that this has to do some extra work just before running the command
// to determine extra environment variables and such.
- let dirty = Work::new(move |desc_tx| {
+ let dirty = Work::new(move |state| {
// Make sure that OUT_DIR exists.
//
// If we have an old build directory, then just move it into place,
}
// And now finally, run the build command itself!
- desc_tx.send(p.to_string()).ok();
- let output = try!(exec_engine.exec_with_output(p).map_err(|mut e| {
+ state.running(&p);
+ let cmd = p.into_process_builder();
+ let output = try!(stream_output(state, &cmd).map_err(|mut e| {
e.desc = format!("failed to run custom build command for `{}`\n{}",
pkg_name, e.desc);
Human(e)
}
}
}
+
+fn stream_output(state: &JobState, cmd: &ProcessBuilder)
+ -> Result<Output, ProcessError> {
+ let mut stdout = Vec::new();
+ let mut stderr = Vec::new();
+
+ let status = try!((|| {
+ let mut cmd = cmd.build_command();
+ cmd.stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .stdin(Stdio::null());
+ let mut child = try!(cmd.spawn());
+ let out = child.stdout.take().unwrap();
+ let err = child.stderr.take().unwrap();
+
+ try!(read2(out, err, &mut |is_out, data, eof| {
+ let idx = if eof {
+ data.len()
+ } else {
+ match data.iter().rposition(|b| *b == b'\n') {
+ Some(i) => i + 1,
+ None => return,
+ }
+ };
+ let data = data.drain(..idx);
+ let dst = if is_out {&mut stdout} else {&mut stderr};
+ let start = dst.len();
+ dst.extend(data);
+ let s = String::from_utf8_lossy(&dst[start..]);
+ if is_out {
+ state.stdout(&s);
+ } else {
+ state.stderr(&s);
+ }
+ }));
+ child.wait()
+ })().map_err(|e| {
+ let msg = format!("could not exeute process {}", cmd);
+ process_error(&msg, Some(e), None, None)
+ }));
+ let output = Output {
+ stdout: stdout,
+ stderr: stderr,
+ status: status,
+ };
+ if !output.status.success() {
+ let msg = format!("process didn't exit successfully: {}", cmd);
+ Err(process_error(&msg, None, Some(&status), Some(&output)))
+ } else {
+ Ok(output)
+ }
+}
-use std::sync::mpsc::Sender;
use std::fmt;
use util::{CargoResult, Fresh, Dirty, Freshness};
+use super::job_queue::JobState;
pub struct Job { dirty: Work, fresh: Work }
/// Each proc should send its description before starting.
/// It should send either once or close immediately.
pub struct Work {
- inner: Box<FnBox<Sender<String>, CargoResult<()>> + Send>,
+ inner: Box<for <'a, 'b> FnBox<&'a JobState<'b>, CargoResult<()>> + Send>,
}
trait FnBox<A, R> {
impl Work {
pub fn new<F>(f: F) -> Work
- where F: FnOnce(Sender<String>) -> CargoResult<()> + Send + 'static
+ where F: FnOnce(&JobState) -> CargoResult<()> + Send + 'static
{
Work { inner: Box::new(f) }
}
Work::new(|_| Ok(()))
}
- pub fn call(self, tx: Sender<String>) -> CargoResult<()> {
+ pub fn call(self, tx: &JobState) -> CargoResult<()> {
self.inner.call_box(tx)
}
pub fn then(self, next: Work) -> Work {
- Work::new(move |tx| {
- try!(self.call(tx.clone()));
- next.call(tx)
+ Work::new(move |state| {
+ try!(self.call(state));
+ next.call(state)
})
}
}
/// Consumes this job by running it, returning the result of the
/// computation.
- pub fn run(self, fresh: Freshness, tx: Sender<String>) -> CargoResult<()> {
+ pub fn run(self, fresh: Freshness, state: &JobState) -> CargoResult<()> {
match fresh {
- Fresh => self.fresh.call(tx),
- Dirty => self.dirty.call(tx),
+ Fresh => self.fresh.call(state),
+ Dirty => self.dirty.call(state),
}
}
}
use std::collections::HashSet;
use std::collections::hash_map::HashMap;
use std::fmt;
+use std::io::Write;
use std::sync::mpsc::{channel, Sender, Receiver};
use crossbeam::{self, Scope};
use super::{Context, Kind, Unit};
use super::job::Job;
+use super::engine::CommandPrototype;
/// A management structure of the entire dependency graph to compile.
///
pub struct JobQueue<'a> {
jobs: usize,
queue: DependencyQueue<Key<'a>, Vec<(Job, Freshness)>>,
- tx: Sender<Message<'a>>,
- rx: Receiver<Message<'a>>,
+ tx: Sender<(Key<'a>, Message)>,
+ rx: Receiver<(Key<'a>, Message)>,
active: usize,
pending: HashMap<Key<'a>, PendingBuild>,
compiled: HashSet<&'a PackageId>,
kind: Kind,
}
-struct Message<'a> {
+pub struct JobState<'a> {
+ tx: Sender<(Key<'a>, Message)>,
key: Key<'a>,
- result: CargoResult<()>,
+}
+
+enum Message {
+ Run(String),
+ Stdout(String),
+ Stderr(String),
+ Finish(CargoResult<()>),
+}
+
+impl<'a> JobState<'a> {
+ pub fn running(&self, cmd: &CommandPrototype) {
+ let _ = self.tx.send((self.key, Message::Run(cmd.to_string())));
+ }
+
+ pub fn stdout(&self, out: &str) {
+ let _ = self.tx.send((self.key, Message::Stdout(out.to_string())));
+ }
+
+ pub fn stderr(&self, err: &str) {
+ let _ = self.tx.send((self.key, Message::Stderr(err.to_string())));
+ }
}
impl<'a> JobQueue<'a> {
// After a job has finished we update our internal state if it was
// successful and otherwise wait for pending work to finish if it failed
// and then immediately return.
+ let mut error = None;
loop {
- while self.active < self.jobs {
+ while error.is_none() && self.active < self.jobs {
if !queue.is_empty() {
let (key, job, fresh) = queue.remove(0);
try!(self.run(key, fresh, job, cx.config, scope));
break
}
- // Now that all possible work has been scheduled, wait for a piece
- // of work to finish. If any package fails to build then we stop
- // scheduling work as quickly as possibly.
- let msg = self.rx.recv().unwrap();
- info!("end: {:?}", msg.key);
- self.active -= 1;
- match msg.result {
- Ok(()) => {
- try!(self.finish(msg.key, cx));
+ let (key, msg) = self.rx.recv().unwrap();
+
+ match msg {
+ Message::Run(cmd) => {
+ try!(cx.config.shell().verbose(|c| c.status("Running", &cmd)));
+ }
+ Message::Stdout(out) => {
+ if cx.config.extra_verbose() {
+ try!(write!(cx.config.shell().out(), "{}", out));
+ }
}
- Err(e) => {
- if self.active > 0 {
- try!(cx.config.shell().say(
- "Build failed, waiting for other \
- jobs to finish...", YELLOW));
- for _ in self.rx.iter().take(self.active as usize) {}
+ Message::Stderr(err) => {
+ if cx.config.extra_verbose() {
+ try!(write!(cx.config.shell().err(), "{}", err));
+ }
+ }
+ Message::Finish(result) => {
+ info!("end: {:?}", key);
+ self.active -= 1;
+ match result {
+ Ok(()) => try!(self.finish(key, cx)),
+ Err(e) => {
+ if self.active > 0 {
+ try!(cx.config.shell().say(
+ "Build failed, waiting for other \
+ jobs to finish...", YELLOW));
+ }
+ if error.is_none() {
+ error = Some(e);
+ }
+ }
}
- return Err(e)
}
}
}
if self.queue.is_empty() {
Ok(())
+ } else if let Some(e) = error {
+ Err(e)
} else {
debug!("queue: {:#?}", self.queue);
Err(internal("finished with jobs still left in the queue"))
*self.counts.get_mut(key.pkg).unwrap() -= 1;
let my_tx = self.tx.clone();
- let (desc_tx, desc_rx) = channel();
scope.spawn(move || {
- my_tx.send(Message {
+ let res = job.run(fresh, &JobState {
+ tx: my_tx.clone(),
key: key,
- result: job.run(fresh, desc_tx),
- }).unwrap();
+ });
+ my_tx.send((key, Message::Finish(res))).unwrap();
});
// Print out some nice progress information
try!(self.note_working_on(config, &key, fresh));
- // only the first message of each job is processed
- if let Ok(msg) = desc_rx.recv() {
- try!(config.shell().verbose(|c| c.status("Running", &msg)));
- }
Ok(())
}
// In general, we try to print "Compiling" for the first nontrivial task
// run for a package, regardless of when that is. We then don't print
// out any more information for a package after we've printed it once.
- fn note_working_on(&mut self, config: &Config, key: &Key<'a>,
+ fn note_working_on(&mut self,
+ config: &Config,
+ key: &Key<'a>,
fresh: Freshness) -> CargoResult<()> {
if (self.compiled.contains(key.pkg) && !key.profile.doc) ||
(self.documented.contains(key.pkg) && key.profile.doc) {
let rustflags = try!(cx.rustflags_args(unit));
- return Ok(Work::new(move |desc_tx| {
+ return Ok(Work::new(move |state| {
// Only at runtime have we discovered what the extra -L and -l
// arguments are for native libraries, so we process those here. We
// also need to be sure to add any -L paths for our plugins to the
// Add the arguments from RUSTFLAGS
rustc.args(&rustflags);
- desc_tx.send(rustc.to_string()).ok();
+ state.running(&rustc);
try!(exec_engine.exec(rustc).chain_error(|| {
human(format!("Could not compile `{}`.", name))
}));
let key = (unit.pkg.package_id().clone(), unit.kind);
let exec_engine = cx.exec_engine.clone();
- Ok(Work::new(move |desc_tx| {
+ Ok(Work::new(move |state| {
if let Some(output) = build_state.outputs.lock().unwrap().get(&key) {
for cfg in output.cfgs.iter() {
rustdoc.arg("--cfg").arg(cfg);
}
}
- desc_tx.send(rustdoc.to_string()).unwrap();
+ state.running(&rustdoc);
exec_engine.exec(rustdoc).chain_error(|| {
human(format!("Could not document `{}`.", name))
})
}
pub fn internal<S: fmt::Display>(error: S) -> Box<CargoError> {
+ _internal(&error)
+}
+
+fn _internal(error: &fmt::Display) -> Box<CargoError> {
Box::new(ConcreteCargoError {
description: error.to_string(),
detail: None,
}
pub fn human<S: fmt::Display>(error: S) -> Box<CargoError> {
+ _human(&error)
+}
+
+fn _human(error: &fmt::Display) -> Box<CargoError> {
Box::new(ConcreteCargoError {
description: error.to_string(),
detail: None,
pub use self::to_semver::ToSemver;
pub use self::to_url::ToUrl;
pub use self::vcs::{GitRepo, HgRepo};
+pub use self::read2::read2;
pub mod config;
pub mod errors;
mod vcs;
mod lazy_cell;
mod flock;
+mod read2;
--- /dev/null
+pub use self::imp::read2;
+
+#[cfg(unix)]
+mod imp {
+ use std::cmp;
+ use std::io::prelude::*;
+ use std::io;
+ use std::mem;
+ use std::os::unix::prelude::*;
+ use std::process::{ChildStdout, ChildStderr};
+ use libc;
+
+ pub fn read2(mut out_pipe: ChildStdout,
+ mut err_pipe: ChildStderr,
+ mut data: &mut FnMut(bool, &mut Vec<u8>, bool)) -> io::Result<()> {
+ unsafe {
+ libc::fcntl(out_pipe.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK);
+ libc::fcntl(err_pipe.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK);
+ }
+
+ let mut out_done = false;
+ let mut err_done = false;
+ let mut out = Vec::new();
+ let mut err = Vec::new();
+
+ let max = cmp::max(out_pipe.as_raw_fd(), err_pipe.as_raw_fd());
+ loop {
+ // wait for either pipe to become readable using `select`
+ let r = unsafe {
+ let mut read: libc::fd_set = mem::zeroed();
+ if !out_done {
+ libc::FD_SET(out_pipe.as_raw_fd(), &mut read);
+ }
+ if !err_done {
+ libc::FD_SET(err_pipe.as_raw_fd(), &mut read);
+ }
+ libc::select(max + 1, &mut read, 0 as *mut _, 0 as *mut _,
+ 0 as *mut _)
+ };
+ if r == -1 {
+ let err = io::Error::last_os_error();
+ if err.kind() == io::ErrorKind::Interrupted {
+ continue
+ }
+ return Err(err)
+ }
+
+ // Read as much as we can from each pipe, ignoring EWOULDBLOCK or
+ // EAGAIN. If we hit EOF, then this will happen because the underlying
+ // reader will return Ok(0), in which case we'll see `Ok` ourselves. In
+ // this case we flip the other fd back into blocking mode and read
+ // whatever's leftover on that file descriptor.
+ let handle = |res: io::Result<_>| {
+ match res {
+ Ok(_) => Ok(true),
+ Err(e) => {
+ if e.kind() == io::ErrorKind::WouldBlock {
+ Ok(false)
+ } else {
+ Err(e)
+ }
+ }
+ }
+ };
+ if !out_done && try!(handle(out_pipe.read_to_end(&mut out))) {
+ out_done = true;
+ }
+ data(true, &mut out, out_done);
+ if !err_done && try!(handle(err_pipe.read_to_end(&mut err))) {
+ err_done = true;
+ }
+ data(false, &mut err, err_done);
+
+ if out_done && err_done {
+ return Ok(())
+ }
+ }
+ }
+}
+
+#[cfg(windows)]
+mod imp {
+ extern crate miow;
+ extern crate winapi;
+
+ use std::io;
+ use std::os::windows::prelude::*;
+ use std::process::{ChildStdout, ChildStderr};
+ use std::slice;
+
+ use self::miow::iocp::{CompletionPort, CompletionStatus};
+ use self::miow::pipe::NamedPipe;
+ use self::miow::Overlapped;
+ use self::winapi::ERROR_BROKEN_PIPE;
+
+ struct Pipe<'a> {
+ dst: &'a mut Vec<u8>,
+ overlapped: Overlapped,
+ pipe: NamedPipe,
+ done: bool,
+ }
+
+ macro_rules! try {
+ ($e:expr) => (match $e {
+ Ok(e) => e,
+ Err(e) => {
+ println!("{} failed with {}", stringify!($e), e);
+ return Err(e)
+ }
+ })
+ }
+
+ pub fn read2(out_pipe: ChildStdout,
+ err_pipe: ChildStderr,
+ mut data: &mut FnMut(bool, &mut Vec<u8>, bool)) -> io::Result<()> {
+ let mut out = Vec::new();
+ let mut err = Vec::new();
+
+ let port = try!(CompletionPort::new(1));
+ try!(port.add_handle(0, &out_pipe));
+ try!(port.add_handle(1, &err_pipe));
+
+ unsafe {
+ let mut out_pipe = Pipe::new(out_pipe, &mut out);
+ let mut err_pipe = Pipe::new(err_pipe, &mut err);
+
+ try!(out_pipe.read());
+ try!(err_pipe.read());
+
+ let mut status = [CompletionStatus::zero(), CompletionStatus::zero()];
+
+ while !out_pipe.done || !err_pipe.done {
+ for status in try!(port.get_many(&mut status, None)) {
+ if status.token() == 0 {
+ out_pipe.complete(status);
+ data(true, out_pipe.dst, out_pipe.done);
+ try!(out_pipe.read());
+ } else {
+ err_pipe.complete(status);
+ data(false, err_pipe.dst, err_pipe.done);
+ try!(err_pipe.read());
+ }
+ }
+ }
+
+ Ok(())
+ }
+ }
+
+ impl<'a> Pipe<'a> {
+ unsafe fn new<P: IntoRawHandle>(p: P, dst: &'a mut Vec<u8>) -> Pipe<'a> {
+ Pipe {
+ dst: dst,
+ pipe: NamedPipe::from_raw_handle(p.into_raw_handle()),
+ overlapped: Overlapped::zero(),
+ done: false,
+ }
+ }
+
+ unsafe fn read(&mut self) -> io::Result<()> {
+ let dst = slice_to_end(self.dst);
+ match self.pipe.read_overlapped(dst, &mut self.overlapped) {
+ Ok(_) => Ok(()),
+ Err(e) => {
+ if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) {
+ self.done = true;
+ Ok(())
+ } else {
+ Err(e)
+ }
+ }
+ }
+ }
+
+ unsafe fn complete(&mut self, status: &CompletionStatus) {
+ let prev = self.dst.len();
+ self.dst.set_len(prev + status.bytes_transferred() as usize);
+ if status.bytes_transferred() == 0 {
+ self.done = true;
+ }
+ }
+ }
+
+ unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] {
+ if v.capacity() == 0 {
+ v.reserve(16);
+ }
+ if v.capacity() == v.len() {
+ v.reserve(1);
+ }
+ slice::from_raw_parts_mut(v.as_mut_ptr().offset(v.len() as isize),
+ v.capacity() - v.len())
+ }
+}
[RUNNING] `rustc build.rs --crate-name build_script_build --crate-type bin [..]`
[RUNNING] `[..]build-script-build[..]`
[ERROR] failed to run custom build command for `foo v0.5.0 ({url})`
-Process didn't exit successfully: `[..]build[..]build-script-build[..]` \
- (exit code: 101)",
+process didn't exit successfully: `[..]build-script-build[..]` (exit code: 101)",
url = p.url())));
}
assert_that(p.cargo_process("build").arg("-v"),
execs().with_status(0)
.with_stderr("\
+[COMPILING] foo v0.5.0 ([..])
+[RUNNING] `rustc [..]`
+[RUNNING] `[..]`
warning: foo
warning: bar
+[RUNNING] `rustc [..]`
"));
}
assert_that(p.cargo_process("build").arg("-v"),
execs().with_status(0)
- .with_stderr(""));
+ .with_stderr("\
+[UPDATING] registry `[..]`
+[DOWNLOADING] bar v0.1.0 ([..])
+[COMPILING] bar v0.1.0 ([..])
+[RUNNING] `rustc [..]`
+[RUNNING] `[..]`
+[RUNNING] `rustc [..]`
+[COMPILING] foo v0.5.0 ([..])
+[RUNNING] `rustc [..]`
+"));
}
#[test]
assert_that(p.cargo_process("build").arg("-vv"),
execs().with_status(0)
.with_stderr("\
+[UPDATING] registry `[..]`
+[DOWNLOADING] bar v0.1.0 ([..])
+[COMPILING] bar v0.1.0 ([..])
+[RUNNING] `rustc [..]`
+[RUNNING] `[..]`
warning: foo
warning: bar
+[RUNNING] `rustc [..]`
+[COMPILING] foo v0.5.0 ([..])
+[RUNNING] `rustc [..]`
+"));
+}
+
+#[test]
+fn output_shows_on_vv() {
+ let p = project("foo")
+ .file("Cargo.toml", r#"
+ [project]
+ name = "foo"
+ version = "0.5.0"
+ authors = []
+ build = "build.rs"
+ "#)
+ .file("src/lib.rs", "")
+ .file("build.rs", r#"
+ use std::io::prelude::*;
+
+ fn main() {
+ std::io::stderr().write_all(b"stderr\n").unwrap();
+ std::io::stdout().write_all(b"stdout\n").unwrap();
+ }
+ "#);
+
+ assert_that(p.cargo_process("build").arg("-vv"),
+ execs().with_status(0)
+ .with_stdout("\
+stdout
+")
+ .with_stderr("\
+[COMPILING] foo v0.5.0 ([..])
+[RUNNING] `rustc [..]`
+[RUNNING] `[..]`
+stderr
+[RUNNING] `rustc [..]`
"));
}